package com.flink.examples.elasticsearch;

import com.flink.examples.TUser;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @Description 将DataStream数据流输出到elasticsearch中
 * @Author JL
 * @Date 2020/09/18
 * @Version V1.0
 */
public class DataStreamSink {

    /**
     * 官方文档：https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html
     */

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        env.setParallelism(1);
        //1.设置Elasticsearch连接，创建索引数据
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("192.168.110.35", 9200, "http"));
        //创建数据源对象 ElasticsearchSink
        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<String>(httpHosts,
                new ElasticsearchSinkFunction<String>() {
                    @Override
                    public void process(String user, RuntimeContext ctx, RequestIndexer indexer) {
                        System.out.println(user);
                        Gson gson = new Gson();
                        Map<String,Object> map = gson.fromJson(user, Map.class);
                        indexer.add(Requests.indexRequest()
                                .index("flink_demo")
                                .type("doc")
                                .source(map));
                    }
                }
        );
        // 设置批量写数据的最大动作量，对批量请求的配置；这指示接收器在每个元素之后发出，否则它们将被缓冲
        esSinkBuilder.setBulkFlushMaxActions(1);
        //刷新前缓冲区的最大数据大小（以MB为单位）
        esSinkBuilder.setBulkFlushMaxSizeMb(50);
        //论缓冲操作的数量或大小如何都要刷新的时间间隔
        esSinkBuilder.setBulkFlushInterval(400);

        //2.写入数据到流中
        //封装数据
        TUser user = new TUser();
        user.setId(9);
        user.setName("wang4");
        user.setAge(23);
        user.setSex(1);
        user.setAddress("CN");
        user.setCreateTimeSeries(System.currentTimeMillis());
        DataStream<String> input = env.fromElements(user).map((MapFunction<TUser, String>) value -> new Gson().toJson(value));
        //3.将数据写入到Elasticearch中
        input.addSink(esSinkBuilder.build());
        env.execute("flink data to es job");
    }

}
